package cn.net.scp.backperssure;

import com.google.common.util.concurrent.RateLimiter;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 * Accepts submitted {@link Task}.
 */
public class Broker {

    private static final Logger LOGGER = LoggerFactory.getLogger(Broker.class);
    private static final int WORKERS_NUMBER = 10;
    private static final int SUBMITTED_TASKS_QUEUE_SIZE = 20;

    private static final RateLimiter RATE_LIMITER = RateLimiter.create(100);

    private final ExecutorService executorService = initializeThreadPoolWithRejection();
    private final Map<String, Result> calculationCache = new ConcurrentHashMap<>();
    private final Map<String, CompletableFuture<Result>> submittedTasksCache = new ConcurrentHashMap<>();

    public CompletableFuture<Result> submit(final Task task) {
        boolean acquire = RATE_LIMITER.tryAcquire(1, 20, TimeUnit.MILLISECONDS);
        if (!acquire) {
            LOGGER.warn("Rate limit. Failed to submit a task: {}.", task.getName());
            return CompletableFuture.failedFuture(new IllegalAccessException("Rate limit."));
        }

        final Result resultCached = calculationCache.get(task.getName());
        // Check already calculated results.
        if (resultCached != null) {
            return CompletableFuture.completedFuture(resultCached);
        }
        // Check the cache of submitted tasks to prevent same task from being submitted again.
        final CompletableFuture<Result> calculationResult = submittedTasksCache.get(task.getName());
        if (calculationResult != null) {
            LOGGER.info("Rejecting a task {} because it was already submitted.", task.getName());
            return calculationResult;
        }
        LOGGER.info("Calculation submitted: {}.", task.getName());
        try {
            final CompletableFuture<Result> calculated = CompletableFuture.supplyAsync(task::calculate, executorService);
            calculated.thenAccept(this::updateCalculatedResultsCache);
            calculated.thenAccept(this::updateSubmittedTasksCache);
            submittedTasksCache.put(task.getName(), calculated);
            return calculated;
        } catch (Exception e) {
            LOGGER.warn("Failed to submit a task: {}.", task.getName());
            return CompletableFuture.failedFuture(e);
        }
    }

    private void updateSubmittedTasksCache(final Result result) {
        submittedTasksCache.remove(result.getTaskName());
    }

    private void updateCalculatedResultsCache(final Result result) {
        calculationCache.put(result.getTaskName(), result);
    }

    public void close() {
        // No new tasks will be accepted.
        executorService.shutdown();
    }

    private ExecutorService initializeThreadPoolWithRejection() {
        final RejectedExecutionHandler handler = new ThreadPoolExecutor.AbortPolicy();
        final BlockingQueue<Runnable> queue = new StackBlockingQueue<>(SUBMITTED_TASKS_QUEUE_SIZE);
        return new ThreadPoolExecutor(WORKERS_NUMBER, WORKERS_NUMBER, 0L, TimeUnit.MINUTES, queue, handler);
    }
}
